/*
 * librdkafka - Apache Kafka C library
 *
 * Copyright (c) 2019-2022, Magnus Edenhill
 *               2025, Confluent Inc.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "test.h"

#include "rdkafka.h"

#include "../src/rdkafka_proto.h"
#include "../src/rdstring.h"
#include "../src/rdunittest.h"

#include <stdarg.h>


/**
 * @name Producer transaction tests using the mock cluster
 *
 */


static int allowed_error;
static int allowed_error_2;

/**
 * @brief Decide what error_cb's will cause the test to fail.
 */
static int
error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
        if (err == allowed_error || err == allowed_error_2 ||
            /* If transport errors are allowed then it is likely
             * that we'll also see ALL_BROKERS_DOWN. */
            (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
             err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
                TEST_SAY("Ignoring allowed error: %s: %s\n",
                         rd_kafka_err2name(err), reason);
                return 0;
        }
        return 1;
}


static rd_kafka_resp_err_t (*on_response_received_cb)(rd_kafka_t *rk,
                                                      int sockfd,
                                                      const char *brokername,
                                                      int32_t brokerid,
                                                      int16_t ApiKey,
                                                      int16_t ApiVersion,
                                                      int32_t CorrId,
                                                      size_t size,
                                                      int64_t rtt,
                                                      rd_kafka_resp_err_t err,
                                                      void *ic_opaque);

/**
 * @brief Simple on_response_received interceptor that simply calls the
 *        sub-test's on_response_received_cb function, if set.
 */
static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t *rk,
                                int sockfd,
                                const char *brokername,
                                int32_t brokerid,
                                int16_t ApiKey,
                                int16_t ApiVersion,
                                int32_t CorrId,
                                size_t size,
                                int64_t rtt,
                                rd_kafka_resp_err_t err,
                                void *ic_opaque) {
        TEST_ASSERT(on_response_received_cb != NULL, "");
        return on_response_received_cb(rk, sockfd, brokername, brokerid, ApiKey,
                                       ApiVersion, CorrId, size, rtt, err,
                                       ic_opaque);
}


/**
 * @brief on_new interceptor to add an on_response_received interceptor.
 */
static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk,
                                           const rd_kafka_conf_t *conf,
                                           void *ic_opaque,
                                           char *errstr,
                                           size_t errstr_size) {
        rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;

        if (on_response_received_cb)
                err = rd_kafka_interceptor_add_on_response_received(
                    rk, "on_response_received", on_response_received_trampoline,
                    ic_opaque);

        return err;
}


/**
 * @brief Create a transactional producer and a mock cluster.
 *
 * The var-arg list is a NULL-terminated list of
 * (const char *key, const char *value) config properties.
 *
 * Special keys:
 *   "on_response_received", "" - enable the on_response_received_cb
 *                                interceptor,
 *                                which must be assigned prior to
 *                                calling create_tnx_producer().
 */
static RD_SENTINEL rd_kafka_t *
create_txn_producer(rd_kafka_mock_cluster_t **mclusterp,
                    const char *transactional_id,
                    int broker_cnt,
                    ...) {
        rd_kafka_conf_t *conf;
        rd_kafka_t *rk;
        char numstr[8];
        va_list ap;
        const char *key;
        rd_bool_t add_interceptors = rd_false;

        rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);

        test_conf_init(&conf, NULL, 60);

        test_conf_set(conf, "transactional.id", transactional_id);
        /* When mock brokers are set to down state they're still binding
         * the port, just not listening to it, which makes connection attempts
         * stall until socket.connection.setup.timeout.ms expires.
         * To speed up detection of brokers being down we reduce this timeout
         * to just a couple of seconds. */
        test_conf_set(conf, "socket.connection.setup.timeout.ms", "5000");
        /* Speed up reconnects */
        test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
        test_conf_set(conf, "test.mock.num.brokers", numstr);
        rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

        test_curr->ignore_dr_err = rd_false;

        va_start(ap, broker_cnt);
        while ((key = va_arg(ap, const char *))) {
                if (!strcmp(key, "on_response_received")) {
                        add_interceptors = rd_true;
                        (void)va_arg(ap, const char *);
                } else {
                        test_conf_set(conf, key, va_arg(ap, const char *));
                }
        }
        va_end(ap);

        /* Add an on_.. interceptors */
        if (add_interceptors)
                rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
                                                     on_new_producer, NULL);

        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        if (mclusterp) {
                *mclusterp = rd_kafka_handle_mock_cluster(rk);
                TEST_ASSERT(*mclusterp, "failed to create mock cluster");

                /* Create some of the common consumer "input" topics
                 * that we must be able to commit to with
                 * send_offsets_to_transaction().
                 * The number depicts the number of partitions in the topic. */
                TEST_CALL_ERR__(
                    rd_kafka_mock_topic_create(*mclusterp, "srctopic4", 4, 1));
                TEST_CALL_ERR__(rd_kafka_mock_topic_create(
                    *mclusterp, "srctopic64", 64, 1));
        }

        return rk;
}


/**
 * @brief Test recoverable errors using mock broker error injections
 *        and code coverage checks.
 */
static void do_test_txn_recoverable_errors(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        const char *groupid = "myGroupId";
        const char *txnid   = "myTxnId";

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
                                 NULL);

        /* Make sure transaction and group coordinators are different.
         * This verifies that AddOffsetsToTxnRequest isn't sent to the
         * transaction coordinator but the group coordinator. */
        rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);

        /*
         * Inject som InitProducerId errors that causes retries
         */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_InitProducerId, 3,
            RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
            RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
            RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
        (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */

        /*
         * Start a transaction
         */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));


        /* Produce a message without error first */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        rd_kafka_flush(rk, -1);

        /*
         * Produce a message, let it fail with a non-idempo/non-txn
         * retryable error
         */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        /* Make sure messages are produced */
        rd_kafka_flush(rk, -1);

        /*
         * Send some arbitrary offsets, first with some failures, then
         * succeed.
         */
        offsets = rd_kafka_topic_partition_list_new(4);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
        rd_kafka_topic_partition_list_add(offsets, "srctopic64", 39)->offset =
            999999111;
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
            999;
        rd_kafka_topic_partition_list_add(offsets, "srctopic64", 19)->offset =
            123456789;

        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
            RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);

        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
            RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
            RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        /*
         * Commit transaction, first with som failures, then succeed.
         */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_EndTxn, 3,
            RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
            RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
            RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
 *        transaction errors and that the producer can recover.
 */
static void do_test_txn_fatal_idempo_errors(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        const char *txnid = "myTxnId";

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
                                 NULL);

        test_curr->ignore_dr_err = rd_true;
        test_curr->is_fatal_cb   = error_is_fatal_cb;
        allowed_error            = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        /*
         * Start a transaction
         */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));


        /* Produce a message without error first */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        /* Produce a message, let it fail with a fatal idempo error. */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        /* Commit the transaction, should fail */
        error = rd_kafka_commit_transaction(rk, -1);
        TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");

        TEST_SAY("commit_transaction() failed (expectedly): %s\n",
                 rd_kafka_error_string(error));

        TEST_ASSERT(!rd_kafka_error_is_fatal(error),
                    "Did not expect fatal error");
        TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
                    "Expected abortable error");
        rd_kafka_error_destroy(error);

        /* Abort the transaction */
        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));

        /* Run a new transaction without errors to verify that the
         * producer can recover. */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));

        /* All done */

        rd_kafka_destroy(rk);

        allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;

        SUB_TEST_PASS();
}


/**
 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
 *        transaction errors, but let the broker-side bumping of the
 *        producer PID take longer than the remaining transaction timeout
 *        which should raise a retriable error from abort_transaction().
 *
 * @param with_sleep After the first abort sleep longer than it takes to
 *                   re-init the pid so that the internal state automatically
 *                   transitions.
 */
static void do_test_txn_slow_reinit(rd_bool_t with_sleep) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        int32_t txn_coord = 2;
        const char *txnid = "myTxnId";
        test_timing_t timing;

        SUB_TEST("%s sleep", with_sleep ? "with" : "without");

        rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
                                 NULL);

        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
                                      txn_coord);

        test_curr->ignore_dr_err = rd_true;
        test_curr->is_fatal_cb   = NULL;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        /*
         * Start a transaction
         */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));


        /* Produce a message without error first */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        test_flush(rk, -1);

        /* Set transaction coordinator latency higher than
         * the abort_transaction() call timeout so that the automatic
         * re-initpid takes longer than abort_transaction(). */
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1,
            RD_KAFKA_RESP_ERR_NO_ERROR, 10000 /*10s*/);

        /* Produce a message, let it fail with a fatal idempo error. */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));


        /* Commit the transaction, should fail */
        TIMING_START(&timing, "commit_transaction(-1)");
        error = rd_kafka_commit_transaction(rk, -1);
        TIMING_STOP(&timing);
        TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");

        TEST_SAY("commit_transaction() failed (expectedly): %s\n",
                 rd_kafka_error_string(error));

        TEST_ASSERT(!rd_kafka_error_is_fatal(error),
                    "Did not expect fatal error");
        TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
                    "Expected abortable error");
        rd_kafka_error_destroy(error);

        /* Abort the transaction, should fail with retriable (timeout) error */
        TIMING_START(&timing, "abort_transaction(100)");
        error = rd_kafka_abort_transaction(rk, 100);
        TIMING_STOP(&timing);
        TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");

        TEST_SAY("First abort_transaction() failed: %s\n",
                 rd_kafka_error_string(error));
        TEST_ASSERT(!rd_kafka_error_is_fatal(error),
                    "Did not expect fatal error");
        TEST_ASSERT(rd_kafka_error_is_retriable(error),
                    "Expected retriable error");
        rd_kafka_error_destroy(error);

        if (with_sleep)
                rd_sleep(12);

        /* Retry abort, should now finish. */
        TEST_SAY("Retrying abort\n");
        TIMING_START(&timing, "abort_transaction(-1)");
        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
        TIMING_STOP(&timing);

        /* Run a new transaction without errors to verify that the
         * producer can recover. */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));

        /* All done */

        rd_kafka_destroy(rk);

        allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;

        SUB_TEST_PASS();
}



/**
 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
 *        transaction errors, but let the broker-side bumping of the
 *        producer PID fail with a fencing error.
 *        Should raise a fatal error.
 *
 * @param error_code Which error code InitProducerIdRequest should fail with.
 *                   Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
 *                   or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
 */
static void do_test_txn_fenced_reinit(rd_kafka_resp_err_t error_code) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        int32_t txn_coord = 2;
        const char *txnid = "myTxnId";
        char errstr[512];
        rd_kafka_resp_err_t fatal_err;

        SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));

        rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
                                 NULL);

        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
                                      txn_coord);

        test_curr->ignore_dr_err = rd_true;
        test_curr->is_fatal_cb   = error_is_fatal_cb;
        allowed_error            = RD_KAFKA_RESP_ERR__FENCED;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        /*
         * Start a transaction
         */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));


        /* Produce a message without error first */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        test_flush(rk, -1);

        /* Fail the PID reinit */
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);

        /* Produce a message, let it fail with a fatal idempo error. */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        test_flush(rk, -1);

        /* Abort the transaction, should fail with a fatal error */
        error = rd_kafka_abort_transaction(rk, -1);
        TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");

        TEST_SAY("abort_transaction() failed: %s\n",
                 rd_kafka_error_string(error));
        TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
        rd_kafka_error_destroy(error);

        fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
        TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
        TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);

        /* All done */

        rd_kafka_destroy(rk);

        allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;

        SUB_TEST_PASS();
}


/**
 * @brief Test EndTxn errors.
 */
static void do_test_txn_endtxn_errors(void) {
        rd_kafka_t *rk                    = NULL;
        rd_kafka_mock_cluster_t *mcluster = NULL;
        rd_kafka_resp_err_t err;
        struct {
                size_t error_cnt;
                rd_kafka_resp_err_t errors[4];
                rd_kafka_resp_err_t exp_err;
                rd_bool_t exp_retriable;
                rd_bool_t exp_abortable;
                rd_bool_t exp_fatal;
                rd_bool_t exp_successful_abort;
        } scenario[] = {
            /* This list of errors is from the EndTxnResponse handler in
             * AK clients/.../TransactionManager.java */
            {
                /* #0 */
                2,
                {RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE},
                /* Should auto-recover */
                RD_KAFKA_RESP_ERR_NO_ERROR,
            },
            {
                /* #1 */
                2,
                {RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR},
                /* Should auto-recover */
                RD_KAFKA_RESP_ERR_NO_ERROR,
            },
            {
                /* #2 */
                1,
                {RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS},
                /* Should auto-recover */
                RD_KAFKA_RESP_ERR_NO_ERROR,
            },
            {
                /* #3 */
                3,
                {RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS},
                /* Should auto-recover */
                RD_KAFKA_RESP_ERR_NO_ERROR,
            },
            {
                /* #4: the abort is auto-recovering thru epoch bump */
                1,
                {RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID},
                RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
                rd_false /* !retriable */,
                rd_true /* abortable */,
                rd_false /* !fatal */,
                rd_true /* successful abort */
            },
            {
                /* #5: the abort is auto-recovering thru epoch bump */
                1,
                {RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING},
                RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
                rd_false /* !retriable */,
                rd_true /* abortable */,
                rd_false /* !fatal */,
                rd_true /* successful abort */
            },
            {
                /* #6 */
                1,
                {RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH},
                /* This error is normalized */
                RD_KAFKA_RESP_ERR__FENCED,
                rd_false /* !retriable */,
                rd_false /* !abortable */,
                rd_true /* fatal */
            },
            {
                /* #7 */
                1,
                {RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
                /* This error is normalized */
                RD_KAFKA_RESP_ERR__FENCED,
                rd_false /* !retriable */,
                rd_false /* !abortable */,
                rd_true /* fatal */
            },
            {
                /* #8 */
                1,
                {RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED},
                RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
                rd_false /* !retriable */,
                rd_false /* !abortable */,
                rd_true /* fatal */
            },
            {
                /* #9 */
                1,
                {RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED},
                RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
                rd_false /* !retriable */,
                rd_true /* abortable */,
                rd_false /* !fatal */
            },
            {
                /* #10 */
                /* Any other error should raise a fatal error */
                1,
                {RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE},
                RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
                rd_false /* !retriable */,
                rd_true /* abortable */,
                rd_false /* !fatal */,
            },
            {
                /* #11 */
                1,
                {RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
                /* This error is normalized */
                RD_KAFKA_RESP_ERR__FENCED,
                rd_false /* !retriable */,
                rd_false /* !abortable */,
                rd_true /* fatal */
            },
            {0},
        };
        int i;

        SUB_TEST_QUICK();

        for (i = 0; scenario[i].error_cnt > 0; i++) {
                int j;
                /* For each scenario, test:
                 *   commit_transaction()
                 *   flush() + commit_transaction()
                 *   abort_transaction()
                 *   flush() + abort_transaction()
                 */
                for (j = 0; j < (2 + 2); j++) {
                        rd_bool_t commit     = j < 2;
                        rd_bool_t with_flush = j & 1;
                        rd_bool_t exp_successful_abort =
                            !commit && scenario[i].exp_successful_abort;
                        const char *commit_str =
                            commit ? (with_flush ? "commit&flush" : "commit")
                                   : (with_flush ? "abort&flush" : "abort");
                        rd_kafka_topic_partition_list_t *offsets;
                        rd_kafka_consumer_group_metadata_t *cgmetadata;
                        rd_kafka_error_t *error;
                        test_timing_t t_call;

                        TEST_SAY("Testing scenario #%d %s with %" PRIusz
                                 " injected erorrs, expecting %s\n",
                                 i, commit_str, scenario[i].error_cnt,
                                 exp_successful_abort
                                     ? "successful abort"
                                     : rd_kafka_err2name(scenario[i].exp_err));

                        if (!rk) {
                                const char *txnid = "myTxnId";
                                rk = create_txn_producer(&mcluster, txnid, 3,
                                                         NULL);
                                TEST_CALL_ERROR__(
                                    rd_kafka_init_transactions(rk, 5000));
                        }

                        /*
                         * Start transaction
                         */
                        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

                        /* Transaction aborts will cause DR errors:
                         * ignore them. */
                        test_curr->ignore_dr_err = !commit;

                        /*
                         * Produce a message.
                         */
                        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                                RD_KAFKA_V_VALUE("hi", 2),
                                                RD_KAFKA_V_END);
                        TEST_ASSERT(!err, "produce failed: %s",
                                    rd_kafka_err2str(err));

                        if (with_flush)
                                test_flush(rk, -1);

                        /*
                         * Send some arbitrary offsets.
                         */
                        offsets = rd_kafka_topic_partition_list_new(4);
                        rd_kafka_topic_partition_list_add(offsets, "srctopic4",
                                                          3)
                            ->offset = 12;
                        rd_kafka_topic_partition_list_add(offsets, "srctopic64",
                                                          60)
                            ->offset = 99999;

                        cgmetadata =
                            rd_kafka_consumer_group_metadata_new("mygroupid");

                        TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
                            rk, offsets, cgmetadata, -1));

                        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
                        rd_kafka_topic_partition_list_destroy(offsets);

                        /*
                         * Commit transaction, first with som failures,
                         * then succeed.
                         */
                        rd_kafka_mock_push_request_errors_array(
                            mcluster, RD_KAFKAP_EndTxn, scenario[i].error_cnt,
                            scenario[i].errors);

                        TIMING_START(&t_call, "%s", commit_str);
                        if (commit)
                                error = rd_kafka_commit_transaction(
                                    rk, tmout_multip(5000));
                        else
                                error = rd_kafka_abort_transaction(
                                    rk, tmout_multip(5000));
                        TIMING_STOP(&t_call);

                        if (error)
                                TEST_SAY(
                                    "Scenario #%d %s failed: %s: %s "
                                    "(retriable=%s, req_abort=%s, "
                                    "fatal=%s)\n",
                                    i, commit_str, rd_kafka_error_name(error),
                                    rd_kafka_error_string(error),
                                    RD_STR_ToF(
                                        rd_kafka_error_is_retriable(error)),
                                    RD_STR_ToF(
                                        rd_kafka_error_txn_requires_abort(
                                            error)),
                                    RD_STR_ToF(rd_kafka_error_is_fatal(error)));
                        else
                                TEST_SAY("Scenario #%d %s succeeded\n", i,
                                         commit_str);

                        if (!scenario[i].exp_err || exp_successful_abort) {
                                TEST_ASSERT(!error,
                                            "Expected #%d %s to succeed, "
                                            "got %s",
                                            i, commit_str,
                                            rd_kafka_error_string(error));
                                continue;
                        }


                        TEST_ASSERT(error != NULL, "Expected #%d %s to fail", i,
                                    commit_str);
                        TEST_ASSERT(scenario[i].exp_err ==
                                        rd_kafka_error_code(error),
                                    "Scenario #%d: expected %s, not %s", i,
                                    rd_kafka_err2name(scenario[i].exp_err),
                                    rd_kafka_error_name(error));
                        TEST_ASSERT(
                            scenario[i].exp_retriable ==
                                (rd_bool_t)rd_kafka_error_is_retriable(error),
                            "Scenario #%d: retriable mismatch", i);
                        TEST_ASSERT(
                            scenario[i].exp_abortable ==
                                (rd_bool_t)rd_kafka_error_txn_requires_abort(
                                    error),
                            "Scenario #%d: abortable mismatch", i);
                        TEST_ASSERT(
                            scenario[i].exp_fatal ==
                                (rd_bool_t)rd_kafka_error_is_fatal(error),
                            "Scenario #%d: fatal mismatch", i);

                        /* Handle errors according to the error flags */
                        if (rd_kafka_error_is_fatal(error)) {
                                TEST_SAY("Fatal error, destroying producer\n");
                                rd_kafka_error_destroy(error);
                                rd_kafka_destroy(rk);
                                rk = NULL; /* Will be re-created on the next
                                            * loop iteration. */

                        } else if (rd_kafka_error_txn_requires_abort(error)) {
                                rd_kafka_error_destroy(error);
                                TEST_SAY(
                                    "Abortable error, "
                                    "aborting transaction\n");
                                TEST_CALL_ERROR__(
                                    rd_kafka_abort_transaction(rk, -1));

                        } else if (rd_kafka_error_is_retriable(error)) {
                                rd_kafka_error_destroy(error);
                                TEST_SAY("Retriable error, retrying %s once\n",
                                         commit_str);
                                if (commit)
                                        TEST_CALL_ERROR__(
                                            rd_kafka_commit_transaction(rk,
                                                                        5000));
                                else
                                        TEST_CALL_ERROR__(
                                            rd_kafka_abort_transaction(rk,
                                                                       5000));
                        } else {
                                TEST_FAIL(
                                    "Scenario #%d %s: "
                                    "Permanent error without enough "
                                    "hints to proceed: %s\n",
                                    i, commit_str,
                                    rd_kafka_error_string(error));
                        }
                }
        }

        /* All done */
        if (rk)
                rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Test that the commit/abort works properly with infinite timeout.
 */
static void do_test_txn_endtxn_infinite(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster = NULL;
        const char *txnid                 = "myTxnId";
        int i;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, txnid, 3, NULL);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        for (i = 0; i < 2; i++) {
                rd_bool_t commit       = i == 0;
                const char *commit_str = commit ? "commit" : "abort";
                rd_kafka_error_t *error;
                test_timing_t t_call;

                /* Messages will fail on as the transaction fails,
                 * ignore the DR error */
                test_curr->ignore_dr_err = rd_true;

                TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

                TEST_CALL_ERR__(rd_kafka_producev(
                    rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
                    RD_KAFKA_V_END));

                /*
                 * Commit/abort transaction, first with som retriable failures,
                 * then success.
                 */
                rd_kafka_mock_push_request_errors(
                    mcluster, RD_KAFKAP_EndTxn, 10,
                    RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR);

                rd_sleep(1);

                TIMING_START(&t_call, "%s_transaction()", commit_str);
                if (commit)
                        error = rd_kafka_commit_transaction(rk, -1);
                else
                        error = rd_kafka_abort_transaction(rk, -1);
                TIMING_STOP(&t_call);

                TEST_SAY("%s returned %s\n", commit_str,
                         error ? rd_kafka_error_string(error) : "success");

                TEST_ASSERT(!error, "Expected %s to succeed, got %s",
                            commit_str, rd_kafka_error_string(error));
        }

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}



/**
 * @brief Test that the commit/abort user timeout is honoured.
 */
static void do_test_txn_endtxn_timeout(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster = NULL;
        const char *txnid                 = "myTxnId";
        int i;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, txnid, 3, NULL);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        for (i = 0; i < 2; i++) {
                rd_bool_t commit       = i == 0;
                const char *commit_str = commit ? "commit" : "abort";
                rd_kafka_error_t *error;
                test_timing_t t_call;

                /* Messages will fail as the transaction fails,
                 * ignore the DR error */
                test_curr->ignore_dr_err = rd_true;

                TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

                TEST_CALL_ERR__(rd_kafka_producev(
                    rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
                    RD_KAFKA_V_END));

                /*
                 * Commit/abort transaction, first with some retriable failures
                 * whos retries exceed the user timeout.
                 */
                rd_kafka_mock_push_request_errors(
                    mcluster, RD_KAFKAP_EndTxn, 10,
                    RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
                    RD_KAFKA_RESP_ERR_NOT_COORDINATOR);

                rd_sleep(1);

                TIMING_START(&t_call, "%s_transaction()", commit_str);
                if (commit)
                        error = rd_kafka_commit_transaction(rk, 100);
                else
                        error = rd_kafka_abort_transaction(rk, 100);
                TIMING_STOP(&t_call);

                TEST_SAY_ERROR(error, "%s returned: ", commit_str);
                TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
                TEST_ASSERT(
                    rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "Expected %s to fail with timeout, not %s: %s", commit_str,
                    rd_kafka_error_name(error), rd_kafka_error_string(error));
                TEST_ASSERT(rd_kafka_error_is_retriable(error),
                            "%s failure should raise a retriable error",
                            commit_str);
                rd_kafka_error_destroy(error);

                /* Now call it again with an infinite timeout, should work. */
                TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
                if (commit)
                        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
                else
                        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
                TIMING_STOP(&t_call);
        }

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}



/**
 * @brief Test commit/abort inflight timeout behaviour, which should result
 *        in a retriable error.
 */
static void do_test_txn_endtxn_timeout_inflight(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster = NULL;
        const char *txnid                 = "myTxnId";
        int32_t coord_id                  = 1;
        int i;

        SUB_TEST();

        allowed_error          = RD_KAFKA_RESP_ERR__TIMED_OUT;
        test_curr->is_fatal_cb = error_is_fatal_cb;

        rk = create_txn_producer(&mcluster, txnid, 1, "transaction.timeout.ms",
                                 "5000", NULL);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        for (i = 0; i < 2; i++) {
                rd_bool_t commit       = i == 0;
                const char *commit_str = commit ? "commit" : "abort";
                rd_kafka_error_t *error;
                test_timing_t t_call;

                /* Messages will fail as the transaction fails,
                 * ignore the DR error */
                test_curr->ignore_dr_err = rd_true;

                TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

                TEST_CALL_ERR__(rd_kafka_producev(
                    rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
                    RD_KAFKA_V_END));

                /* Let EndTxn & EndTxn retry timeout */
                rd_kafka_mock_broker_push_request_error_rtts(
                    mcluster, coord_id, RD_KAFKAP_EndTxn, 2,
                    RD_KAFKA_RESP_ERR_NO_ERROR, 10000,
                    RD_KAFKA_RESP_ERR_NO_ERROR, 10000);

                rd_sleep(1);

                TIMING_START(&t_call, "%s_transaction()", commit_str);
                if (commit)
                        error = rd_kafka_commit_transaction(rk, 4000);
                else
                        error = rd_kafka_abort_transaction(rk, 4000);
                TIMING_STOP(&t_call);

                TEST_SAY_ERROR(error, "%s returned: ", commit_str);
                TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
                TEST_ASSERT(
                    rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "Expected %s to fail with timeout, not %s: %s", commit_str,
                    rd_kafka_error_name(error), rd_kafka_error_string(error));
                TEST_ASSERT(rd_kafka_error_is_retriable(error),
                            "%s failure should raise a retriable error",
                            commit_str);
                rd_kafka_error_destroy(error);

                /* Now call it again with an infinite timeout, should work. */
                TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
                if (commit)
                        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
                else
                        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
                TIMING_STOP(&t_call);
        }

        /* All done */

        rd_kafka_destroy(rk);

        allowed_error          = RD_KAFKA_RESP_ERR_NO_ERROR;
        test_curr->is_fatal_cb = NULL;

        SUB_TEST_PASS();
}



/**
 * @brief Test that EndTxn is properly sent for aborted transactions
 *        even if AddOffsetsToTxnRequest was retried.
 *        This is a check for a txn_req_cnt bug.
 */
static void do_test_txn_req_cnt(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        const char *txnid = "myTxnId";

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, txnid, 3, NULL);

        /* Messages will fail on abort(), ignore the DR error */
        test_curr->ignore_dr_err = rd_true;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        /*
         * Send some arbitrary offsets, first with some failures, then
         * succeed.
         */
        offsets = rd_kafka_topic_partition_list_new(2);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
        rd_kafka_topic_partition_list_add(offsets, "srctopic64", 40)->offset =
            999999111;

        rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_AddOffsetsToTxn,
                                          2,
                                          RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
                                          RD_KAFKA_RESP_ERR_NOT_COORDINATOR);

        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
            RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
            RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Test abortable errors using mock broker error injections
 *        and code coverage checks.
 */
static void do_test_txn_requires_abort_errors(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        int r;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

        test_curr->ignore_dr_err = rd_true;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        /*
         * 1. Fail on produce
         */
        TEST_SAY("1. Fail on produce\n");

        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

        /* Wait for messages to fail */
        test_flush(rk, 5000);

        /* Any other transactional API should now raise an error */
        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        error =
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);
        TEST_ASSERT(error, "expected error");
        TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
                    "expected abortable error, not %s",
                    rd_kafka_error_string(error));
        TEST_SAY("Error %s: %s\n", rd_kafka_error_name(error),
                 rd_kafka_error_string(error));
        rd_kafka_error_destroy(error);

        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));

        /*
         * 2. Restart transaction and fail on AddPartitionsToTxn
         */
        TEST_SAY("2. Fail on AddPartitionsToTxn\n");

        /* First refresh proper Metadata to clear the topic's auth error,
         * otherwise the produce() below will fail immediately. */
        r = test_get_partition_count(rk, "mytopic", 5000);
        TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
            RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

        error = rd_kafka_commit_transaction(rk, 5000);
        TEST_ASSERT(error, "commit_transaction should have failed");
        TEST_SAY("commit_transaction() error %s: %s\n",
                 rd_kafka_error_name(error), rd_kafka_error_string(error));
        rd_kafka_error_destroy(error);

        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));

        /*
         * 3. Restart transaction and fail on AddOffsetsToTxn
         */
        TEST_SAY("3. Fail on AddOffsetsToTxn\n");

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_AddOffsetsToTxn, 1,
            RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);

        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        error =
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
        TEST_ASSERT(error, "Expected send_offsets..() to fail");
        TEST_ASSERT(rd_kafka_error_code(error) ==
                        RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
                    "expected send_offsets_to_transaction() to fail with "
                    "group auth error: not %s",
                    rd_kafka_error_name(error));
        rd_kafka_error_destroy(error);

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);


        error = rd_kafka_commit_transaction(rk, 5000);
        TEST_ASSERT(error, "commit_transaction should have failed");
        rd_kafka_error_destroy(error);

        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Test error handling and recover for when broker goes down during
 *        an ongoing transaction.
 */
static void do_test_txn_broker_down_in_txn(rd_bool_t down_coord) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        int32_t coord_id, leader_id, down_id;
        const char *down_what;
        rd_kafka_resp_err_t err;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        int msgcnt                   = 1000;
        int remains                  = 0;

        /* Assign coordinator and leader to two different brokers */
        coord_id  = 1;
        leader_id = 2;
        if (down_coord) {
                down_id   = coord_id;
                down_what = "coordinator";
        } else {
                down_id   = leader_id;
                down_what = "leader";
        }

        SUB_TEST_QUICK("Test %s down", down_what);

        rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);

        /* Broker down is not a test-failing error */
        allowed_error          = RD_KAFKA_RESP_ERR__TRANSPORT;
        test_curr->is_fatal_cb = error_is_fatal_cb;

        err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
        TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));

        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      coord_id);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);

        /* Start transactioning */
        TEST_SAY("Starting transaction\n");
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
                                  msgcnt / 2, NULL, 0, &remains);

        TEST_SAY("Bringing down %s %" PRId32 "\n", down_what, down_id);
        rd_kafka_mock_broker_set_down(mcluster, down_id);

        rd_kafka_flush(rk, 3000);

        /* Produce remaining messages */
        test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
                                  msgcnt / 2, msgcnt / 2, NULL, 0, &remains);

        rd_sleep(2);

        TEST_SAY("Bringing up %s %" PRId32 "\n", down_what, down_id);
        rd_kafka_mock_broker_set_up(mcluster, down_id);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));

        TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);

        rd_kafka_destroy(rk);

        test_curr->is_fatal_cb = NULL;

        SUB_TEST_PASS();
}



/**
 * @brief Advance the coord_id to the next broker.
 */
static void set_next_coord(rd_kafka_mock_cluster_t *mcluster,
                           const char *transactional_id,
                           int broker_cnt,
                           int32_t *coord_idp) {
        int32_t new_coord_id;

        new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
        TEST_SAY("Changing transaction coordinator from %" PRId32 " to %" PRId32
                 "\n",
                 *coord_idp, new_coord_id);
        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      new_coord_id);

        *coord_idp = new_coord_id;
}

/**
 * @brief Switch coordinator during a transaction.
 *
 */
static void do_test_txn_switch_coordinator(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        int32_t coord_id;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        const int broker_cnt         = 5;
        const int iterations         = 20;
        int i;

        test_timeout_set(iterations * 10);

        SUB_TEST("Test switching coordinators");

        rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);

        coord_id = 1;
        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      coord_id);

        /* Start transactioning */
        TEST_SAY("Starting transaction\n");
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        for (i = 0; i < iterations; i++) {
                const int msgcnt = 100;
                int remains      = 0;

                set_next_coord(mcluster, transactional_id, broker_cnt,
                               &coord_id);

                TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

                test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
                                   msgcnt / 2, NULL, 0);

                if (!(i % 3))
                        set_next_coord(mcluster, transactional_id, broker_cnt,
                                       &coord_id);

                /* Produce remaining messages */
                test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
                                          msgcnt / 2, msgcnt / 2, NULL, 0,
                                          &remains);

                if ((i & 1) || !(i % 8))
                        set_next_coord(mcluster, transactional_id, broker_cnt,
                                       &coord_id);


                if (!(i % 5)) {
                        test_curr->ignore_dr_err = rd_false;
                        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));

                } else {
                        test_curr->ignore_dr_err = rd_true;
                        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
                }
        }


        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Switch coordinator during a transaction when AddOffsetsToTxn
 *        are sent. #3571.
 */
static void do_test_txn_switch_coordinator_refresh(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;

        SUB_TEST("Test switching coordinators (refresh)");

        rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);

        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      1);

        /* Start transactioning */
        TEST_SAY("Starting transaction\n");
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        /* Switch the coordinator so that AddOffsetsToTxnRequest
         * will respond with NOT_COORDINATOR. */
        TEST_SAY("Switching to coordinator 2\n");
        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      2);

        /*
         * Send some arbitrary offsets.
         */
        offsets = rd_kafka_topic_partition_list_new(4);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
        rd_kafka_topic_partition_list_add(offsets, "srctopic64", 29)->offset =
            99999;

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
            rk, offsets, cgmetadata, 20 * 1000));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);


        /* Produce some messages */
        test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0);

        /* And commit the transaction */
        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Test fatal error handling when transactions are not supported
 *        by the broker.
 */
static void do_test_txns_not_supported(void) {
        rd_kafka_t *rk;
        rd_kafka_conf_t *conf;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        rd_kafka_resp_err_t err;

        SUB_TEST_QUICK();

        test_conf_init(&conf, NULL, 10);

        test_conf_set(conf, "transactional.id", "myxnid");
        test_conf_set(conf, "bootstrap.servers", ",");
        rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        /* Create mock cluster */
        mcluster = rd_kafka_mock_cluster_new(rk, 3);

        /* Disable InitProducerId */
        rd_kafka_mock_set_apiversion(mcluster, 22 /*InitProducerId*/, -1, -1);


        rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));



        error = rd_kafka_init_transactions(rk, 5 * 1000);
        TEST_SAY("init_transactions() returned %s: %s\n",
                 error ? rd_kafka_error_name(error) : "success",
                 error ? rd_kafka_error_string(error) : "success");

        TEST_ASSERT(error, "Expected init_transactions() to fail");
        TEST_ASSERT(rd_kafka_error_code(error) ==
                        RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
                    "Expected init_transactions() to fail with %s, not %s: %s",
                    rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
                    rd_kafka_error_name(error), rd_kafka_error_string(error));
        rd_kafka_error_destroy(error);

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test"),
                                RD_KAFKA_V_KEY("test", 4), RD_KAFKA_V_END);
        TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
                    "Expected producev() to fail with %s, not %s",
                    rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
                    rd_kafka_err2name(err));

        rd_kafka_mock_cluster_destroy(mcluster);

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
 */
static void do_test_txns_send_offsets_concurrent_is_retried(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

        test_curr->ignore_dr_err = rd_true;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

        /* Wait for messages to be delivered */
        test_flush(rk, 5000);


        /*
         * Have AddOffsetsToTxn fail but eventually succeed due to
         * infinite retries.
         */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_AddOffsetsToTxn,
            1 + 5, /* first request + some retries */
            RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
            RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
            RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
            RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
            RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
            RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);

        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Verify that send_offsets_to_transaction() with no eligible offsets
 *        is handled properly - the call should succeed immediately and be
 *        repeatable.
 */
static void do_test_txns_send_offsets_non_eligible(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

        test_curr->ignore_dr_err = rd_true;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

        /* Wait for messages to be delivered */
        test_flush(rk, 5000);

        /* Empty offsets list */
        offsets = rd_kafka_topic_partition_list_new(0);

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));

        /* Now call it again, should also succeed. */
        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Verify that request timeouts don't cause crash (#2913).
 */
static void do_test_txns_no_timeout_crash(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;

        SUB_TEST_QUICK();

        rk =
            create_txn_producer(&mcluster, "txnid", 3, "socket.timeout.ms",
                                "1000", "transaction.timeout.ms", "5000", NULL);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

        test_flush(rk, -1);

        /* Delay all broker connections */
        if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
            (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
            (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
                TEST_FAIL("Failed to set broker RTT: %s",
                          rd_kafka_err2str(err));

        /* send_offsets..() should now time out */
        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        error =
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
        TEST_ASSERT(error, "Expected send_offsets..() to fail");
        TEST_SAY("send_offsets..() failed with %serror: %s\n",
                 rd_kafka_error_is_retriable(error) ? "retriable " : "",
                 rd_kafka_error_string(error));
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "expected send_offsets_to_transaction() to fail with "
                    "timeout, not %s",
                    rd_kafka_error_name(error));
        TEST_ASSERT(rd_kafka_error_is_retriable(error),
                    "expected send_offsets_to_transaction() to fail with "
                    "a retriable error");
        rd_kafka_error_destroy(error);

        /* Reset delay and try again */
        if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
            (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
            (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
                TEST_FAIL("Failed to reset broker RTT: %s",
                          rd_kafka_err2str(err));

        TEST_SAY("Retrying send_offsets..()\n");
        error =
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
        TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
                    rd_kafka_error_string(error));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        /* All done */
        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Test auth failure handling.
 */
static void do_test_txn_auth_failure(int16_t ApiKey,
                                     rd_kafka_resp_err_t ErrorCode) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;

        SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s", rd_kafka_ApiKey2str(ApiKey),
                       rd_kafka_err2name(ErrorCode));

        rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

        rd_kafka_mock_push_request_errors(mcluster, ApiKey, 1, ErrorCode);

        error = rd_kafka_init_transactions(rk, 5000);
        TEST_ASSERT(error, "Expected init_transactions() to fail");

        TEST_SAY("init_transactions() failed: %s: %s\n",
                 rd_kafka_err2name(rd_kafka_error_code(error)),
                 rd_kafka_error_string(error));
        TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
                    "Expected error %s, not %s", rd_kafka_err2name(ErrorCode),
                    rd_kafka_err2name(rd_kafka_error_code(error)));
        TEST_ASSERT(rd_kafka_error_is_fatal(error),
                    "Expected error to be fatal");
        TEST_ASSERT(!rd_kafka_error_is_retriable(error),
                    "Expected error to not be retriable");
        rd_kafka_error_destroy(error);

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Issue #3041: Commit fails due to message flush() taking too long,
 *        eventually resulting in an unabortable error and failure to
 *        re-init the transactional producer.
 */
static void do_test_txn_flush_timeout(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        rd_kafka_error_t *error;
        const char *txnid      = "myTxnId";
        const char *topic      = "myTopic";
        const int32_t coord_id = 2;
        int msgcounter         = 0;
        rd_bool_t is_retry     = rd_false;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, txnid, 3, "message.timeout.ms",
                                 "10000", "transaction.timeout.ms", "10000",
                                 /* Speed up coordinator reconnect */
                                 "reconnect.backoff.max.ms", "1000", NULL);


        /* Broker down is not a test-failing error */
        test_curr->is_fatal_cb = error_is_fatal_cb;
        allowed_error          = RD_KAFKA_RESP_ERR__TRANSPORT;

        rd_kafka_mock_topic_create(mcluster, topic, 2, 3);

        /* Set coordinator so we can disconnect it later */
        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);

        /*
         * Init transactions
         */
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

retry:
        if (!is_retry) {
                /* First attempt should fail. */

                test_curr->ignore_dr_err = rd_true;
                test_curr->exp_dr_err    = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

                /* Assign invalid partition leaders for some partitions so
                 * that messages will not be delivered. */
                rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
                rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);

        } else {
                /* The retry should succeed */
                test_curr->ignore_dr_err = rd_false;
                test_curr->exp_dr_err    = is_retry
                                               ? RD_KAFKA_RESP_ERR_NO_ERROR
                                               : RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

                rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
                rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
        }


        /*
         * Start a transaction
         */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        /*
         * Produce some messages to specific partitions and random.
         */
        test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
                                  &msgcounter);
        test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
                                  &msgcounter);
        test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA, 0, 0, 100,
                                  NULL, 10, &msgcounter);


        /*
         * Send some arbitrary offsets.
         */
        offsets = rd_kafka_topic_partition_list_new(4);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
        rd_kafka_topic_partition_list_add(offsets, "srctopic64", 49)->offset =
            999999111;
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
            999;
        rd_kafka_topic_partition_list_add(offsets, "srctopic64", 34)->offset =
            123456789;

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        rd_sleep(2);

        if (!is_retry) {
                /* Now disconnect the coordinator. */
                TEST_SAY("Disconnecting transaction coordinator %" PRId32 "\n",
                         coord_id);
                rd_kafka_mock_broker_set_down(mcluster, coord_id);
        }

        /*
         * Start committing.
         */
        error = rd_kafka_commit_transaction(rk, -1);

        if (!is_retry) {
                TEST_ASSERT(error != NULL, "Expected commit to fail");
                TEST_SAY("commit_transaction() failed (expectedly): %s\n",
                         rd_kafka_error_string(error));
                rd_kafka_error_destroy(error);

        } else {
                TEST_ASSERT(!error, "Expected commit to succeed, not: %s",
                            rd_kafka_error_string(error));
        }

        if (!is_retry) {
                /*
                 * Bring the coordinator back up.
                 */
                rd_kafka_mock_broker_set_up(mcluster, coord_id);
                rd_sleep(2);

                /*
                 * Abort, and try again, this time without error.
                 */
                TEST_SAY("Aborting and retrying\n");
                is_retry = rd_true;

                TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
                goto retry;
        }

        /* All done */

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief ESC-4424: rko is reused in response handler after destroy in coord_req
 *        sender due to bad state.
 *
 * This is somewhat of a race condition so we need to perform a couple of
 * iterations before it hits, usually 2 or 3, so we try at least 15 times.
 */
static void do_test_txn_coord_req_destroy(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        int i;
        int errcnt = 0;

        SUB_TEST();

        rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

        test_curr->ignore_dr_err = rd_true;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        for (i = 0; i < 15; i++) {
                rd_kafka_error_t *error;
                rd_kafka_resp_err_t err;
                rd_kafka_topic_partition_list_t *offsets;
                rd_kafka_consumer_group_metadata_t *cgmetadata;

                test_timeout_set(10);

                TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

                /*
                 * Inject errors to trigger retries
                 */
                rd_kafka_mock_push_request_errors(
                    mcluster, RD_KAFKAP_AddPartitionsToTxn,
                    2, /* first request + number of internal retries */
                    RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
                    RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);

                rd_kafka_mock_push_request_errors(
                    mcluster, RD_KAFKAP_AddOffsetsToTxn,
                    3, /* first request + number of internal retries */
                    RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
                    RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
                    RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);

                err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                        RD_KAFKA_V_VALUE("hi", 2),
                                        RD_KAFKA_V_END);
                TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

                rd_kafka_mock_push_request_errors(
                    mcluster, RD_KAFKAP_Produce, 4,
                    RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
                    RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
                    RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
                    RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
                /* FIXME: When KIP-360 is supported, add this error:
                 *        RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */

                err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                        RD_KAFKA_V_VALUE("hi", 2),
                                        RD_KAFKA_V_END);
                TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));


                /*
                 * Send offsets to transaction
                 */

                offsets = rd_kafka_topic_partition_list_new(1);
                rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)
                    ->offset = 12;

                cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

                error = rd_kafka_send_offsets_to_transaction(rk, offsets,
                                                             cgmetadata, -1);

                TEST_SAY("send_offsets_to_transaction() #%d: %s\n", i,
                         rd_kafka_error_string(error));

                /* As we can't control the exact timing and sequence
                 * of requests this sometimes fails and sometimes succeeds,
                 * but we run the test enough times to trigger at least
                 * one failure. */
                if (error) {
                        TEST_SAY(
                            "send_offsets_to_transaction() #%d "
                            "failed (expectedly): %s\n",
                            i, rd_kafka_error_string(error));
                        TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
                                    "Expected abortable error for #%d", i);
                        rd_kafka_error_destroy(error);
                        errcnt++;
                }

                rd_kafka_consumer_group_metadata_destroy(cgmetadata);
                rd_kafka_topic_partition_list_destroy(offsets);

                /* Allow time for internal retries */
                rd_sleep(2);

                TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
        }

        TEST_SAY("Got %d errors\n", errcnt);
        TEST_ASSERT(errcnt > 0,
                    "Expected at least one send_offets_to_transaction() "
                    "failure");

        /* All done */

        rd_kafka_destroy(rk);
}


static rd_atomic32_t multi_find_req_cnt;

static rd_kafka_resp_err_t
multi_find_on_response_received_cb(rd_kafka_t *rk,
                                   int sockfd,
                                   const char *brokername,
                                   int32_t brokerid,
                                   int16_t ApiKey,
                                   int16_t ApiVersion,
                                   int32_t CorrId,
                                   size_t size,
                                   int64_t rtt,
                                   rd_kafka_resp_err_t err,
                                   void *ic_opaque) {
        rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
        rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;

        if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
                return RD_KAFKA_RESP_ERR_NO_ERROR;

        TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
                 ", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
                 rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
                 rtt != -1 ? (float)rtt / 1000.0 : 0.0,
                 done ? "already done" : "not done yet",
                 rd_kafka_err2name(err));


        if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
                /* Trigger a broker down/up event, which in turns
                 * triggers the coord_req_fsm(). */
                rd_kafka_mock_broker_set_down(mcluster, 2);
                rd_kafka_mock_broker_set_up(mcluster, 2);
                return RD_KAFKA_RESP_ERR_NO_ERROR;
        }

        /* Trigger a broker down/up event, which in turns
         * triggers the coord_req_fsm(). */
        rd_kafka_mock_broker_set_down(mcluster, 3);
        rd_kafka_mock_broker_set_up(mcluster, 3);

        /* Clear the downed broker's latency so that it reconnects
         * quickly, otherwise the ApiVersionRequest will be delayed and
         * this will in turn delay the -> UP transition that we need to
         * trigger the coord_reqs. */
        rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);

        /* Only do this down/up once */
        rd_atomic32_add(&multi_find_req_cnt, 10000);

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


/**
 * @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
 *        the same coord_req_t, but the first one received will destroy
 *        the coord_req_t object and make the subsequent FindCoordingResponses
 *        reference a freed object.
 *
 * What we want to achieve is this sequence:
 *  1. AddOffsetsToTxnRequest + Response which..
 *  2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
 *  3. Triggers a FindCoordinatorRequest
 *  4. FindCoordinatorResponse from 3 is received ..
 *  5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
 *  6. Another broker changing state to Up triggers coord reqs again, which..
 *  7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
 *  7. FindCoordinatorResponse from 5 is received, references the destroyed rko
 *     and crashes.
 */
static void do_test_txn_coord_req_multi_find(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
        int i;

        SUB_TEST();

        rd_atomic32_init(&multi_find_req_cnt, 0);

        on_response_received_cb = multi_find_on_response_received_cb;
        rk                      = create_txn_producer(&mcluster, txnid, 3,
                                                      /* Need connections to all brokers so we
                                                       * can trigger coord_req_fsm events
                                                       * by toggling connections. */
                                                      "enable.sparse.connections", "false",
                                                      /* Set up on_response_received interceptor */
                                                      "on_response_received", "", NULL);

        /* Let broker 1 be both txn and group coordinator
         * so that the group coordinator connection is up when it is time
         * send the TxnOffsetCommitRequest. */
        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
        rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);

        /* Set broker 1, 2, and 3 as leaders for a partition each and
         * later produce to both partitions so we know there's a connection
         * to all brokers. */
        rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);

        /* Broker down is not a test-failing error */
        allowed_error          = RD_KAFKA_RESP_ERR__TRANSPORT;
        test_curr->is_fatal_cb = error_is_fatal_cb;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        for (i = 0; i < 3; i++) {
                err = rd_kafka_producev(
                    rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(i),
                    RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
                TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
        }

        test_flush(rk, 5000);

        /*
         * send_offsets_to_transaction() will query for the group coordinator,
         * we need to make those requests slow so that multiple requests are
         * sent.
         */
        for (i = 1; i <= 3; i++)
                rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);

        /*
         * Send offsets to transaction
         */

        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;

        cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);

        error =
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);

        TEST_SAY("send_offsets_to_transaction() %s\n",
                 rd_kafka_error_string(error));
        TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
                    rd_kafka_error_string(error));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        /* Clear delay */
        for (i = 1; i <= 3; i++)
                rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);

        rd_sleep(5);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));

        /* All done */

        TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
                    "on_request_sent interceptor did not trigger properly");

        rd_kafka_destroy(rk);

        on_response_received_cb = NULL;

        SUB_TEST_PASS();
}


/**
 * @brief ESC-4410: adding producer partitions gradually will trigger multiple
 *        AddPartitionsToTxn requests. Due to a bug the third partition to be
 *        registered would hang in PEND_TXN state.
 *
 * Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
 * at the same time, followed by a need for a third:
 *
 * 1. Set coordinator broker rtt high (to give us time to produce).
 * 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
 * 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
 * 4. Wait for second AddPartitionsToTxn response.
 * 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
 *    causes it to be stale in pending state.
 */

static rd_atomic32_t multi_addparts_resp_cnt;
static rd_kafka_resp_err_t
multi_addparts_response_received_cb(rd_kafka_t *rk,
                                    int sockfd,
                                    const char *brokername,
                                    int32_t brokerid,
                                    int16_t ApiKey,
                                    int16_t ApiVersion,
                                    int32_t CorrId,
                                    size_t size,
                                    int64_t rtt,
                                    rd_kafka_resp_err_t err,
                                    void *ic_opaque) {

        if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
                TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
                         ", ApiKey %hd, CorrId %d, rtt %.2fms, count %" PRId32
                         ": %s\n",
                         rd_kafka_name(rk), brokername, brokerid, ApiKey,
                         CorrId, rtt != -1 ? (float)rtt / 1000.0 : 0.0,
                         rd_atomic32_get(&multi_addparts_resp_cnt),
                         rd_kafka_err2name(err));

                rd_atomic32_add(&multi_addparts_resp_cnt, 1);
        }

        return RD_KAFKA_RESP_ERR_NO_ERROR;
}


static void do_test_txn_addparts_req_multi(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        const char *txnid = "txnid", *topic = "mytopic";
        int32_t txn_coord = 2;

        SUB_TEST();

        rd_atomic32_init(&multi_addparts_resp_cnt, 0);

        on_response_received_cb = multi_addparts_response_received_cb;
        rk = create_txn_producer(&mcluster, txnid, 3, "linger.ms", "0",
                                 "message.timeout.ms", "9000",
                                 /* Set up on_response_received interceptor */
                                 "on_response_received", "", NULL);

        /* Let broker 1 be txn coordinator. */
        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
                                      txn_coord);

        rd_kafka_mock_topic_create(mcluster, topic, 3, 1);

        /* Set partition leaders to non-txn-coord broker so they wont
         * be affected by rtt delay */
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);



        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        /*
         * Run one transaction first to let the client familiarize with
         * the topic, this avoids metadata lookups, etc, when the real
         * test is run.
         */
        TEST_SAY("Running seed transaction\n");
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
        TEST_CALL_ERR__(rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
                                          RD_KAFKA_V_VALUE("seed", 4),
                                          RD_KAFKA_V_END));
        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));


        /*
         * Now perform test transaction with rtt delays
         */
        TEST_SAY("Running test transaction\n");

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        /* Reset counter */
        rd_atomic32_set(&multi_addparts_resp_cnt, 0);

        /* Add latency to txn coordinator so we can pace our produce() calls */
        rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);

        /* Produce to partition 0 */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        rd_usleep(500 * 1000, NULL);

        /* Produce to partition 1 */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(1),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
        while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
                rd_usleep(10 * 1000, NULL);

        TEST_SAY("%" PRId32 " AddPartitionsToTxnResponses seen\n",
                 rd_atomic32_get(&multi_addparts_resp_cnt));

        /* Produce to partition 2, this message will hang in
         * queue if the bug is not fixed. */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(2),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        /* Allow some extra time for things to settle before committing
         * transaction. */
        rd_usleep(1000 * 1000, NULL);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10 * 1000));

        /* All done */
        rd_kafka_destroy(rk);

        on_response_received_cb = NULL;

        SUB_TEST_PASS();
}



/**
 * @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
 *
 * There are two things to test;
 *  - OffsetFetch triggered by committed() (and similar code paths)
 *  - OffsetFetch triggered by assign()
 */
static void do_test_unstable_offset_commit(void) {
        rd_kafka_t *rk, *c;
        rd_kafka_conf_t *c_conf;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_topic_partition_list_t *offsets;
        const char *topic              = "srctopic4";
        const int msgcnt               = 100;
        const int64_t offset_to_commit = msgcnt / 2;
        int i;
        int remains = 0;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

        test_conf_init(&c_conf, NULL, 0);
        test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
        test_conf_set(c_conf, "bootstrap.servers",
                      rd_kafka_mock_cluster_bootstraps(mcluster));
        test_conf_set(c_conf, "enable.partition.eof", "true");
        test_conf_set(c_conf, "auto.offset.reset", "error");
        c = test_create_consumer("mygroup", NULL, c_conf, NULL);

        rd_kafka_mock_topic_create(mcluster, topic, 2, 3);

        /* Produce some messages to the topic so that the consumer has
         * something to read. */
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
        test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt, NULL, 0,
                                  &remains);
        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));


        /* Commit offset */
        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
            offset_to_commit;
        TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0 /*sync*/));
        rd_kafka_topic_partition_list_destroy(offsets);

        /* Retrieve offsets by calling committed().
         *
         * Have OffsetFetch fail and retry, on the first iteration
         * the API timeout is higher than the amount of time the retries will
         * take and thus succeed, and on the second iteration the timeout
         * will be lower and thus fail. */
        for (i = 0; i < 2; i++) {
                rd_kafka_resp_err_t err;
                rd_kafka_resp_err_t exp_err =
                    i == 0 ? RD_KAFKA_RESP_ERR_NO_ERROR
                           : RD_KAFKA_RESP_ERR__TIMED_OUT;
                int timeout_ms = exp_err ? 200 : 5 * 1000;

                rd_kafka_mock_push_request_errors(
                    mcluster, RD_KAFKAP_OffsetFetch,
                    1 + 5, /* first request + some retries */
                    RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
                    RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
                    RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
                    RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
                    RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
                    RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);

                offsets = rd_kafka_topic_partition_list_new(1);
                rd_kafka_topic_partition_list_add(offsets, topic, 0);

                err = rd_kafka_committed(c, offsets, timeout_ms);

                TEST_SAY("#%d: committed() returned %s (expected %s)\n", i,
                         rd_kafka_err2name(err), rd_kafka_err2name(exp_err));

                TEST_ASSERT(err == exp_err,
                            "#%d: Expected committed() to return %s, not %s", i,
                            rd_kafka_err2name(exp_err), rd_kafka_err2name(err));
                TEST_ASSERT(offsets->cnt == 1,
                            "Expected 1 committed offset, not %d",
                            offsets->cnt);
                if (!exp_err)
                        TEST_ASSERT(offsets->elems[0].offset ==
                                        offset_to_commit,
                                    "Expected committed offset %" PRId64
                                    ", "
                                    "not %" PRId64,
                                    offset_to_commit, offsets->elems[0].offset);
                else
                        TEST_ASSERT(offsets->elems[0].offset < 0,
                                    "Expected no committed offset, "
                                    "not %" PRId64,
                                    offsets->elems[0].offset);

                rd_kafka_topic_partition_list_destroy(offsets);
        }

        TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
            RD_KAFKA_OFFSET_STORED;

        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_OffsetFetch,
            1 + 5, /* first request + some retries */
            RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
            RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
            RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
            RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
            RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
            RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);

        test_consumer_incremental_assign("assign", c, offsets);
        rd_kafka_topic_partition_list_destroy(offsets);

        test_consumer_poll_exact("consume", c, 0, 1 /*eof*/, 0, msgcnt / 2,
                                 rd_true /*exact counts*/, NULL);

        /* All done */
        rd_kafka_destroy(c);
        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief If a message times out locally before being attempted to send
 *        and commit_transaction() is called, the transaction must not succeed.
 *        https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
 */
static void do_test_commit_after_msg_timeout(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        int32_t coord_id, leader_id;
        rd_kafka_resp_err_t err;
        rd_kafka_error_t *error;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        int remains                  = 0;

        SUB_TEST_QUICK();

        /* Assign coordinator and leader to two different brokers */
        coord_id  = 1;
        leader_id = 2;

        rk = create_txn_producer(&mcluster, transactional_id, 3,
                                 "message.timeout.ms", "5000",
                                 "transaction.timeout.ms", "10000", NULL);

        /* Broker down is not a test-failing error */
        allowed_error          = RD_KAFKA_RESP_ERR__TRANSPORT;
        test_curr->is_fatal_cb = error_is_fatal_cb;
        test_curr->exp_dr_err  = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

        err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
        TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));

        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      coord_id);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);

        /* Start transactioning */
        TEST_SAY("Starting transaction\n");
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        TEST_SAY("Bringing down %" PRId32 "\n", leader_id);
        rd_kafka_mock_broker_set_down(mcluster, leader_id);
        rd_kafka_mock_broker_set_down(mcluster, coord_id);

        test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);

        error = rd_kafka_commit_transaction(rk, -1);
        TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
        TEST_SAY_ERROR(error, "commit_transaction() failed (as expected): ");
        TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
                    "Expected txn_requires_abort error");
        rd_kafka_error_destroy(error);

        /* Bring the brokers up so the abort can complete */
        rd_kafka_mock_broker_set_up(mcluster, coord_id);
        rd_kafka_mock_broker_set_up(mcluster, leader_id);

        TEST_SAY("Aborting transaction\n");
        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));

        TEST_ASSERT(remains == 0, "%d message(s) were not flushed\n", remains);

        TEST_SAY("Attempting second transaction, which should succeed\n");
        test_curr->is_fatal_cb = error_is_fatal_cb;
        test_curr->exp_dr_err  = RD_KAFKA_RESP_ERR_NO_ERROR;

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
        test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));

        TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);

        rd_kafka_destroy(rk);

        allowed_error          = RD_KAFKA_RESP_ERR_NO_ERROR;
        test_curr->is_fatal_cb = NULL;

        SUB_TEST_PASS();
}


/**
 * @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump
 *        during an ongoing transaction.
 *        The transaction should instead enter the abortable state.
 */
static void do_test_out_of_order_seq(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        int32_t txn_coord = 1, leader = 2;
        const char *txnid = "myTxnId";
        test_timing_t timing;
        rd_kafka_resp_err_t err;

        SUB_TEST_QUICK();

        rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
                                 NULL);

        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
                                      txn_coord);

        rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);

        test_curr->ignore_dr_err = rd_true;
        test_curr->is_fatal_cb   = NULL;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        /*
         * Start a transaction
         */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));



        /* Produce one seeding message first to get the leader up and running */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
        test_flush(rk, -1);

        /* Let partition leader have a latency of 2 seconds
         * so that we can have multiple messages in-flight. */
        rd_kafka_mock_broker_set_rtt(mcluster, leader, 2 * 1000);

        /* Produce a message, let it fail with with different errors,
         * ending with OUT_OF_ORDER which previously triggered an
         * Epoch bump. */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_Produce, 3,
            RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
            RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
            RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);

        /* Produce three messages that will be delayed
         * and have errors injected.*/
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        /* Now sleep a short while so that the messages are processed
         * by the broker and errors are returned. */
        TEST_SAY("Sleeping..\n");
        rd_sleep(5);

        rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);

        /* Produce a fifth message, should fail with ERR__STATE since
         * the transaction should have entered the abortable state. */
        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_PARTITION(0),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
                    "Expected produce() to fail with ERR__STATE, not %s",
                    rd_kafka_err2name(err));
        TEST_SAY("produce() failed as expected: %s\n", rd_kafka_err2str(err));

        /* Commit the transaction, should fail with abortable error. */
        TIMING_START(&timing, "commit_transaction(-1)");
        error = rd_kafka_commit_transaction(rk, -1);
        TIMING_STOP(&timing);
        TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");

        TEST_SAY("commit_transaction() failed (expectedly): %s\n",
                 rd_kafka_error_string(error));

        TEST_ASSERT(!rd_kafka_error_is_fatal(error),
                    "Did not expect fatal error");
        TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
                    "Expected abortable error");
        rd_kafka_error_destroy(error);

        /* Abort the transaction */
        TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));

        /* Run a new transaction without errors to verify that the
         * producer can recover. */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Verify lossless delivery if topic disappears from Metadata for awhile.
 *
 * If a topic is removed from metadata inbetween transactions, the producer
 * will remove its partition state for the topic's partitions.
 * If later the same topic comes back (same topic instance, not a new creation)
 * then the producer must restore the previously used msgid/BaseSequence
 * in case the same Epoch is still used, or messages will be silently lost
 * as they would seem like legit duplicates to the broker.
 *
 * Reproduction:
 *   1. produce msgs to topic, commit transaction.
 *   2. remove topic from metadata
 *   3. make sure client updates its metadata, which removes the partition
 *      objects.
 *   4. restore the topic in metadata
 *   5. produce new msgs to topic, commit transaction.
 *   6. consume topic. All messages should be accounted for.
 */
static void do_test_topic_disappears_for_awhile(void) {
        rd_kafka_t *rk, *c;
        rd_kafka_conf_t *c_conf;
        rd_kafka_mock_cluster_t *mcluster;
        const char *topic = "mytopic";
        const char *txnid = "myTxnId";
        test_timing_t timing;
        int i;
        int msgcnt              = 0;
        const int partition_cnt = 10;

        SUB_TEST_QUICK();

        rk = create_txn_producer(
            &mcluster, txnid, 1, "batch.num.messages", "3", "linger.ms", "100",
            "topic.metadata.refresh.interval.ms", "2000", NULL);

        rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        for (i = 0; i < 2; i++) {
                int cnt                = 3 * 2 * partition_cnt;
                rd_bool_t remove_topic = (i % 2) == 0;
                /*
                 * Start a transaction
                 */
                TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));


                while (cnt-- >= 0) {
                        TEST_CALL_ERR__(rd_kafka_producev(
                            rk, RD_KAFKA_V_TOPIC(topic),
                            RD_KAFKA_V_PARTITION(cnt % partition_cnt),
                            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
                        msgcnt++;
                }

                /* Commit the transaction */
                TIMING_START(&timing, "commit_transaction(-1)");
                TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
                TIMING_STOP(&timing);



                if (remove_topic) {
                        /* Make it seem the topic is removed, refresh metadata,
                         * and then make the topic available again. */
                        const rd_kafka_metadata_t *md;

                        TEST_SAY("Marking topic as non-existent\n");

                        rd_kafka_mock_topic_set_error(
                            mcluster, topic,
                            RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);

                        TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, NULL, &md,
                                                          tmout_multip(5000)));

                        rd_kafka_metadata_destroy(md);

                        rd_sleep(2);

                        TEST_SAY("Bringing topic back to life\n");
                        rd_kafka_mock_topic_set_error(
                            mcluster, topic, RD_KAFKA_RESP_ERR_NO_ERROR);
                }
        }

        TEST_SAY("Verifying messages by consumtion\n");
        test_conf_init(&c_conf, NULL, 0);
        test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
        test_conf_set(c_conf, "bootstrap.servers",
                      rd_kafka_mock_cluster_bootstraps(mcluster));
        test_conf_set(c_conf, "enable.partition.eof", "true");
        test_conf_set(c_conf, "auto.offset.reset", "earliest");
        c = test_create_consumer("mygroup", NULL, c_conf, NULL);

        test_consumer_subscribe(c, topic);
        test_consumer_poll_exact("consume", c, 0, partition_cnt, 0, msgcnt,
                                 rd_true /*exact*/, NULL);
        rd_kafka_destroy(c);


        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Test that group coordinator requests can handle an
 *        untimely disconnect.
 *
 * The transaction manager makes use of librdkafka coord_req to commit
 * transaction offsets to the group coordinator.
 * If the connection to the given group coordinator is not up the
 * coord_req code will request a connection once, but if this connection fails
 * there will be no new attempts and the coord_req will idle until either
 * destroyed or the connection is retried for other reasons.
 * This in turn stalls the send_offsets_to_transaction() call until the
 * transaction times out.
 *
 * There are two variants to this test based on switch_coord:
 *  - True - Switches the coordinator during the downtime.
 *           The client should detect this and send the request to the
 *           new coordinator.
 *  - False - The coordinator remains on the down broker. Client will reconnect
 *            when down broker comes up again.
 */
struct some_state {
        rd_kafka_mock_cluster_t *mcluster;
        rd_bool_t switch_coord;
        int32_t broker_id;
        const char *grpid;
};

static int delayed_up_cb(void *arg) {
        struct some_state *state = arg;
        rd_sleep(3);
        if (state->switch_coord) {
                TEST_SAY("Switching group coordinator to %" PRId32 "\n",
                         state->broker_id);
                rd_kafka_mock_coordinator_set(state->mcluster, "group",
                                              state->grpid, state->broker_id);
        } else {
                TEST_SAY("Bringing up group coordinator %" PRId32 "..\n",
                         state->broker_id);
                rd_kafka_mock_broker_set_up(state->mcluster, state->broker_id);
        }
        return 0;
}

static void do_test_disconnected_group_coord(rd_bool_t switch_coord) {
        const char *topic       = "mytopic";
        const char *txnid       = "myTxnId";
        const char *grpid       = "myGrpId";
        const int partition_cnt = 1;
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        struct some_state state = RD_ZERO_INIT;
        test_timing_t timing;
        thrd_t thrd;
        int ret;

        SUB_TEST_QUICK("switch_coord=%s", RD_STR_ToF(switch_coord));

        test_curr->is_fatal_cb = error_is_fatal_cb;
        allowed_error          = RD_KAFKA_RESP_ERR__TRANSPORT;

        rk = create_txn_producer(&mcluster, txnid, 3, NULL);

        rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);

        /* Broker 1: txn coordinator
         * Broker 2: group coordinator
         * Broker 3: partition leader & backup coord if switch_coord=true */
        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
        rd_kafka_mock_coordinator_set(mcluster, "group", grpid, 2);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3);

        /* Bring down group coordinator so there are no undesired
         * connections to it. */
        rd_kafka_mock_broker_set_down(mcluster, 2);

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
        test_flush(rk, -1);

        rd_sleep(1);

        /* Run a background thread that after 3s, which should be enough
         * to perform the first failed connection attempt, makes the
         * group coordinator available again. */
        state.switch_coord = switch_coord;
        state.mcluster     = mcluster;
        state.grpid        = grpid;
        state.broker_id    = switch_coord ? 3 : 2;
        if (thrd_create(&thrd, delayed_up_cb, &state) != thrd_success)
                TEST_FAIL("Failed to create thread");

        TEST_SAY("Calling send_offsets_to_transaction()\n");
        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 1;
        cgmetadata = rd_kafka_consumer_group_metadata_new(grpid);

        TIMING_START(&timing, "send_offsets_to_transaction(-1)");
        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
        TIMING_STOP(&timing);
        TIMING_ASSERT(&timing, 0, 10 * 1000 /*10s*/);

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);
        thrd_join(thrd, &ret);

        /* Commit the transaction */
        TIMING_START(&timing, "commit_transaction(-1)");
        TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
        TIMING_STOP(&timing);

        rd_kafka_destroy(rk);

        allowed_error          = RD_KAFKA_RESP_ERR_NO_ERROR;
        test_curr->is_fatal_cb = NULL;

        SUB_TEST_PASS();
}


/**
 * @brief Test that a NULL coordinator is not fatal when
 * the transactional producer reconnects to the txn coordinator
 * and the first thing it does is a FindCoordinatorRequest that
 * fails with COORDINATOR_NOT_AVAILABLE, setting coordinator to NULL.
 */
static void do_test_txn_coordinator_null_not_fatal(void) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        rd_kafka_resp_err_t err;
        int32_t coord_id             = 1;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        int msgcnt                   = 1;
        int remains                  = 0;

        SUB_TEST_QUICK();

        /* Broker down is not a test-failing error */
        allowed_error          = RD_KAFKA_RESP_ERR__TRANSPORT;
        allowed_error_2        = RD_KAFKA_RESP_ERR__TIMED_OUT;
        test_curr->is_fatal_cb = error_is_fatal_cb;
        test_curr->exp_dr_err  = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

        /* One second is the minimum transaction timeout */
        rk = create_txn_producer(&mcluster, transactional_id, 1,
                                 "transaction.timeout.ms", "1000", NULL);

        err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
        TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));

        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      coord_id);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);

        /* Start transactioning */
        TEST_SAY("Starting transaction\n");
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        /* Makes the produce request timeout. */
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, coord_id, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_NO_ERROR, 3000);

        test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
                                  msgcnt, NULL, 0, &remains);

        /* This value is linked to transaction.timeout.ms, needs enough time
         * so the message times out and a DrainBump sequence is started. */
        rd_kafka_flush(rk, 1000);

        /* To trigger the error the COORDINATOR_NOT_AVAILABLE response
         * must come AFTER idempotent state has changed to WaitTransport
         * but BEFORE it changes to WaitPID. To make it more likely
         * rd_kafka_txn_coord_timer_start timeout can be changed to 5 ms
         * in rd_kafka_txn_coord_query, when unable to query for
         * transaction coordinator.
         */
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1,
            RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10);

        /* Coordinator down starts the FindCoordinatorRequest loop. */
        TEST_SAY("Bringing down coordinator %" PRId32 "\n", coord_id);
        rd_kafka_mock_broker_set_down(mcluster, coord_id);

        /* Coordinator down for some time. */
        rd_usleep(100 * 1000, NULL);

        /* When it comes up, the error is triggered, if the preconditions
         * happen. */
        TEST_SAY("Bringing up coordinator %" PRId32 "\n", coord_id);
        rd_kafka_mock_broker_set_up(mcluster, coord_id);

        /* Make sure DRs are received */
        rd_kafka_flush(rk, 1000);

        error = rd_kafka_commit_transaction(rk, -1);

        TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
        TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
        TEST_SAY("commit_transaction() failed (expectedly): %s\n",
                 rd_kafka_error_string(error));
        rd_kafka_error_destroy(error);

        /* Needs to wait some time before closing to make sure it doesn't go
         * into TERMINATING state before error is triggered. */
        rd_usleep(1000 * 1000, NULL);
        rd_kafka_destroy(rk);

        allowed_error          = RD_KAFKA_RESP_ERR_NO_ERROR;
        allowed_error_2        = RD_KAFKA_RESP_ERR_NO_ERROR;
        test_curr->exp_dr_err  = RD_KAFKA_RESP_ERR_NO_ERROR;
        test_curr->is_fatal_cb = NULL;

        SUB_TEST_PASS();
}



/**
 * @brief Simple test to make sure the init_transactions() timeout is honoured
 *        and also not infinite.
 */
static void do_test_txn_resumable_init(void) {
        rd_kafka_t *rk;
        const char *transactional_id = "txnid";
        rd_kafka_error_t *error;
        test_timing_t duration;

        SUB_TEST();

        rd_kafka_conf_t *conf;

        test_conf_init(&conf, NULL, 20);
        test_conf_set(conf, "bootstrap.servers", "");
        test_conf_set(conf, "transactional.id", transactional_id);
        test_conf_set(conf, "transaction.timeout.ms", "4000");

        rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

        /* First make sure a lower timeout is honoured. */
        TIMING_START(&duration, "init_transactions(1000)");
        error = rd_kafka_init_transactions(rk, 1000);
        TIMING_STOP(&duration);

        if (error)
                TEST_SAY("First init_transactions failed (as expected): %s\n",
                         rd_kafka_error_string(error));
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "Expected _TIMED_OUT, not %s",
                    error ? rd_kafka_error_string(error) : "success");
        rd_kafka_error_destroy(error);

        TIMING_ASSERT(&duration, 900, 1500);

        TEST_SAY(
            "Performing second init_transactions() call now with an "
            "infinite timeout: "
            "should time out in 2 x transaction.timeout.ms\n");

        TIMING_START(&duration, "init_transactions(infinite)");
        error = rd_kafka_init_transactions(rk, -1);
        TIMING_STOP(&duration);

        if (error)
                TEST_SAY("Second init_transactions failed (as expected): %s\n",
                         rd_kafka_error_string(error));
        TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
                    "Expected _TIMED_OUT, not %s",
                    error ? rd_kafka_error_string(error) : "success");
        rd_kafka_error_destroy(error);

        TIMING_ASSERT(&duration, 2 * 4000 - 500, 2 * 4000 + 500);

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Retries a transaction call until it succeeds or returns a
 *        non-retriable error - which will cause the test to fail.
 *
 * @param intermed_calls Is a block of code that will be called after each
 *                       retriable failure of \p call.
 */
#define RETRY_TXN_CALL__(call, intermed_calls)                                 \
        do {                                                                   \
                rd_kafka_error_t *_error = call;                               \
                if (!_error)                                                   \
                        break;                                                 \
                TEST_SAY_ERROR(_error, "%s: ", "" #call);                      \
                TEST_ASSERT(rd_kafka_error_is_retriable(_error),               \
                            "Expected retriable error");                       \
                TEST_SAY("%s failed, retrying in 1 second\n", "" #call);       \
                rd_kafka_error_destroy(_error);                                \
                intermed_calls;                                                \
                rd_sleep(1);                                                   \
        } while (1)

/**
 * @brief Call \p call and expect it to fail with \p exp_err_code.
 */
#define TXN_CALL_EXPECT_ERROR__(call, exp_err_code)                            \
        do {                                                                   \
                rd_kafka_error_t *_error = call;                               \
                TEST_ASSERT(_error != NULL,                                    \
                            "%s: Expected %s error, got success", "" #call,    \
                            rd_kafka_err2name(exp_err_code));                  \
                TEST_SAY_ERROR(_error, "%s: ", "" #call);                      \
                TEST_ASSERT(rd_kafka_error_code(_error) == exp_err_code,       \
                            "%s: Expected %s error, got %s", "" #call,         \
                            rd_kafka_err2name(exp_err_code),                   \
                            rd_kafka_error_name(_error));                      \
                rd_kafka_error_destroy(_error);                                \
        } while (0)


/**
 * @brief Simple test to make sure short API timeouts can be safely resumed
 *        by calling the same API again.
 *
 * @param do_commit Commit transaction if true, else abort transaction.
 */
static void do_test_txn_resumable_calls_timeout(rd_bool_t do_commit) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        int32_t coord_id             = 1;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        int msgcnt                   = 1;
        int remains                  = 0;

        SUB_TEST("%s_transaction", do_commit ? "commit" : "abort");

        rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);

        err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
        TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));

        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      coord_id);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);

        TEST_SAY("Starting transaction\n");
        TEST_SAY("Delaying first two InitProducerIdRequests by 500ms\n");
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, coord_id, RD_KAFKAP_InitProducerId, 2,
            RD_KAFKA_RESP_ERR_NO_ERROR, 500, RD_KAFKA_RESP_ERR_NO_ERROR, 500);

        RETRY_TXN_CALL__(
            rd_kafka_init_transactions(rk, 100),
            TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
                                    RD_KAFKA_RESP_ERR__CONFLICT));

        RETRY_TXN_CALL__(rd_kafka_begin_transaction(rk), /*none*/);


        TEST_SAY("Delaying ProduceRequests by 3000ms\n");
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, coord_id, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_NO_ERROR, 3000);

        test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
                                  msgcnt, NULL, 0, &remains);


        TEST_SAY("Delaying SendOffsetsToTransaction by 400ms\n");
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, coord_id, RD_KAFKAP_AddOffsetsToTxn, 1,
            RD_KAFKA_RESP_ERR_NO_ERROR, 400);
        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        /* This is not a resumable call on timeout */
        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));

        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);


        TEST_SAY("Delaying EndTxnRequests by 1200ms\n");
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, coord_id, RD_KAFKAP_EndTxn, 1, RD_KAFKA_RESP_ERR_NO_ERROR,
            1200);

        /* Committing/aborting the transaction will also be delayed by the
         * previous accumulated remaining delays. */

        if (do_commit) {
                TEST_SAY("Committing transaction\n");

                RETRY_TXN_CALL__(
                    rd_kafka_commit_transaction(rk, 100),
                    TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
                                            RD_KAFKA_RESP_ERR__CONFLICT));
        } else {
                TEST_SAY("Aborting transaction\n");

                RETRY_TXN_CALL__(
                    rd_kafka_abort_transaction(rk, 100),
                    TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, -1),
                                            RD_KAFKA_RESP_ERR__CONFLICT));
        }

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Verify that resuming timed out calls that after the timeout, but
 *        before the resuming call, would error out.
 */
static void do_test_txn_resumable_calls_timeout_error(rd_bool_t do_commit) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_resp_err_t err;
        int32_t coord_id             = 1;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        int msgcnt                   = 1;
        int remains                  = 0;
        rd_kafka_error_t *error;

        SUB_TEST_QUICK("%s_transaction", do_commit ? "commit" : "abort");

        rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);

        err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
        TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));

        rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
                                      coord_id);
        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);

        TEST_SAY("Starting transaction\n");

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
                                  msgcnt, NULL, 0, &remains);


        TEST_SAY("Fail EndTxn fatally after 2000ms\n");
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, coord_id, RD_KAFKAP_EndTxn, 1,
            RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, 2000);

        if (do_commit) {
                TEST_SAY("Committing transaction\n");

                TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
                                        RD_KAFKA_RESP_ERR__TIMED_OUT);

                /* Sleep so that the background EndTxn fails locally and sets
                 * an error result. */
                rd_sleep(3);

                error = rd_kafka_commit_transaction(rk, -1);

        } else {
                TEST_SAY("Aborting transaction\n");

                TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
                                        RD_KAFKA_RESP_ERR__TIMED_OUT);

                /* Sleep so that the background EndTxn fails locally and sets
                 * an error result. */
                rd_sleep(3);

                error = rd_kafka_commit_transaction(rk, -1);
        }

        TEST_ASSERT(error != NULL && rd_kafka_error_is_fatal(error),
                    "Expected fatal error, not %s",
                    rd_kafka_error_string(error));
        TEST_ASSERT(rd_kafka_error_code(error) ==
                        RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
                    "Expected error INVALID_TXN_STATE, got %s",
                    rd_kafka_error_name(error));
        rd_kafka_error_destroy(error);

        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


/**
 * @brief Concurrent transaction API calls are not permitted.
 *        This test makes sure they're properly enforced.
 *
 * For each transactional API, call it with a 5s timeout, and during that time
 * from another thread call transactional APIs, one by one, and verify that
 * we get an ERR__CONFLICT error back in the second thread.
 *
 * We use a mutex for synchronization, the main thread will hold the lock
 * when not calling an API but release it just prior to calling.
 * The other thread will acquire the lock, sleep, and hold the lock while
 * calling the concurrent API that should fail immediately, releasing the lock
 * when done.
 *
 */

struct _txn_concurrent_state {
        const char *api;
        mtx_t lock;
        rd_kafka_t *rk;
        struct test *test;
};

static int txn_concurrent_thread_main(void *arg) {
        struct _txn_concurrent_state *state = arg;
        static const char *apis[]           = {
            "init_transactions",           "begin_transaction",
            "send_offsets_to_transaction", "commit_transaction",
            "abort_transaction",           NULL};
        rd_kafka_t *rk       = state->rk;
        const char *main_api = NULL;
        int i;

        /* Update TLS variable so TEST_..() macros work */
        test_curr = state->test;

        while (1) {
                const char *api         = NULL;
                const int timeout_ms    = 10000;
                rd_kafka_error_t *error = NULL;
                rd_kafka_resp_err_t exp_err;
                test_timing_t duration;

                /* Wait for other thread's txn call to start, then sleep a bit
                 * to increase the chance of that call has really begun. */
                mtx_lock(&state->lock);

                if (state->api && state->api == main_api) {
                        /* Main thread is still blocking on the last API call */
                        TEST_SAY("Waiting for main thread to finish %s()\n",
                                 main_api);
                        mtx_unlock(&state->lock);
                        rd_sleep(1);
                        continue;
                } else if (!(main_api = state->api)) {
                        mtx_unlock(&state->lock);
                        break;
                }

                rd_sleep(1);

                for (i = 0; (api = apis[i]) != NULL; i++) {
                        TEST_SAY(
                            "Triggering concurrent %s() call while "
                            "main is in %s() call\n",
                            api, main_api);
                        TIMING_START(&duration, "%s", api);

                        if (!strcmp(api, "init_transactions"))
                                error =
                                    rd_kafka_init_transactions(rk, timeout_ms);
                        else if (!strcmp(api, "begin_transaction"))
                                error = rd_kafka_begin_transaction(rk);
                        else if (!strcmp(api, "send_offsets_to_transaction")) {
                                rd_kafka_topic_partition_list_t *offsets =
                                    rd_kafka_topic_partition_list_new(1);
                                rd_kafka_consumer_group_metadata_t *cgmetadata =
                                    rd_kafka_consumer_group_metadata_new(
                                        "mygroupid");
                                rd_kafka_topic_partition_list_add(
                                    offsets, "srctopic4", 0)
                                    ->offset = 12;

                                error = rd_kafka_send_offsets_to_transaction(
                                    rk, offsets, cgmetadata, -1);
                                rd_kafka_consumer_group_metadata_destroy(
                                    cgmetadata);
                                rd_kafka_topic_partition_list_destroy(offsets);
                        } else if (!strcmp(api, "commit_transaction"))
                                error =
                                    rd_kafka_commit_transaction(rk, timeout_ms);
                        else if (!strcmp(api, "abort_transaction"))
                                error =
                                    rd_kafka_abort_transaction(rk, timeout_ms);
                        else
                                TEST_FAIL("Unknown API: %s", api);

                        TIMING_STOP(&duration);

                        TEST_SAY_ERROR(error, "Conflicting %s() call: ", api);
                        TEST_ASSERT(error,
                                    "Expected conflicting %s() call to fail",
                                    api);

                        exp_err = !strcmp(api, main_api)
                                      ? RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
                                      : RD_KAFKA_RESP_ERR__CONFLICT;

                        TEST_ASSERT(rd_kafka_error_code(error) == exp_err,

                                    "Conflicting %s(): Expected %s, not %s",
                                    api, rd_kafka_err2str(exp_err),
                                    rd_kafka_error_name(error));
                        TEST_ASSERT(
                            rd_kafka_error_is_retriable(error),
                            "Conflicting %s(): Expected retriable error", api);
                        rd_kafka_error_destroy(error);
                        /* These calls should fail immediately */
                        TIMING_ASSERT(&duration, 0, 100);
                }

                mtx_unlock(&state->lock);
        }

        return 0;
}

static void do_test_txn_concurrent_operations(rd_bool_t do_commit) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        int32_t coord_id = 1;
        rd_kafka_resp_err_t err;
        const char *topic            = "test";
        const char *transactional_id = "txnid";
        int remains                  = 0;
        thrd_t thrd;
        struct _txn_concurrent_state state = RD_ZERO_INIT;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;

        SUB_TEST("%s", do_commit ? "commit" : "abort");

        test_timeout_set(90);

        /* We need to override the value of socket.connection.setup.timeout.ms
         * to be at least 2*RTT of the mock broker. This is because the first
         * ApiVersion request will fail, since we make the request with v3, and
         * the mock broker's MaxVersion is 2, so the request is retried with v0.
         * We use the value 3*RTT to add some buffer.
         */
        rk = create_txn_producer(&mcluster, transactional_id, 1,
                                 "socket.connection.setup.timeout.ms", "15000",
                                 NULL);

        /* Set broker RTT to 3.5s so that the background thread has ample
         * time to call its conflicting APIs.
         * This value must be less than socket.connection.setup.timeout.ms/2. */
        rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 3500);

        err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
        TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));

        /* Set up shared state between us and the concurrent thread */
        mtx_init(&state.lock, mtx_plain);
        state.test = test_curr;
        state.rk   = rk;

        /* We release the lock only while calling the TXN API */
        mtx_lock(&state.lock);

        /* Spin up concurrent thread */
        if (thrd_create(&thrd, txn_concurrent_thread_main, (void *)&state) !=
            thrd_success)
                TEST_FAIL("Failed to create thread");

#define _start_call(callname)                                                  \
        do {                                                                   \
                state.api = callname;                                          \
                mtx_unlock(&state.lock);                                       \
        } while (0)
#define _end_call() mtx_lock(&state.lock)

        _start_call("init_transactions");
        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
        _end_call();

        /* This call doesn't block, so can't really be tested concurrently. */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10,
                                  NULL, 0, &remains);

        _start_call("send_offsets_to_transaction");
        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        TEST_CALL_ERROR__(
            rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);
        _end_call();

        if (do_commit) {
                _start_call("commit_transaction");
                TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
                _end_call();
        } else {
                _start_call("abort_transaction");
                TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
                _end_call();
        }

        /* Signal completion to background thread */
        state.api = NULL;

        mtx_unlock(&state.lock);

        thrd_join(thrd, NULL);

        rd_kafka_destroy(rk);

        mtx_destroy(&state.lock);

        SUB_TEST_PASS();
}


/**
 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
 *        transaction errors, but let the broker-side abort of the
 *        transaction fail with a fencing error.
 *        Should raise a fatal error.
 *
 * @param error_code Which error code EndTxn should fail with.
 *                   Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
 *                   or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
 */
static void do_test_txn_fenced_abort(rd_kafka_resp_err_t error_code) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_error_t *error;
        int32_t txn_coord = 2;
        const char *txnid = "myTxnId";
        char errstr[512];
        rd_kafka_resp_err_t fatal_err;
        size_t errors_cnt;

        SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));

        rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
                                 NULL);

        rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
                                      txn_coord);

        test_curr->ignore_dr_err = rd_true;
        test_curr->is_fatal_cb   = error_is_fatal_cb;
        allowed_error            = RD_KAFKA_RESP_ERR__FENCED;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));

        /*
         * Start a transaction
         */
        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));


        /* Produce a message without error first */
        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        test_flush(rk, -1);

        /* Fail abort transaction  */
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, txn_coord, RD_KAFKAP_EndTxn, 1, error_code, 0);

        /* Fail the PID reinit */
        rd_kafka_mock_broker_push_request_error_rtts(
            mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);

        /* Produce a message, let it fail with a fatal idempo error. */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_Produce, 1,
            RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);

        TEST_CALL_ERR__(rd_kafka_producev(
            rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
            RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));

        test_flush(rk, -1);

        /* Abort the transaction, should fail with a fatal error */
        error = rd_kafka_abort_transaction(rk, -1);
        TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");

        TEST_SAY_ERROR(error, "abort_transaction() failed: ");
        TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
        rd_kafka_error_destroy(error);

        fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
        TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
        TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);

        /* Verify that the producer sent the expected number of EndTxn requests
         * by inspecting the mock broker error stack,
         * which should now be empty. */
        if (rd_kafka_mock_broker_error_stack_cnt(
                mcluster, txn_coord, RD_KAFKAP_EndTxn, &errors_cnt)) {
                TEST_FAIL(
                    "Broker error count should succeed for API %s"
                    " on broker %" PRId32,
                    rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), txn_coord);
        }
        /* Checks all the  RD_KAFKAP_EndTxn responses have been consumed */
        TEST_ASSERT(errors_cnt == 0,
                    "Expected error count 0 for API %s, found %zu",
                    rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), errors_cnt);

        if (rd_kafka_mock_broker_error_stack_cnt(
                mcluster, txn_coord, RD_KAFKAP_InitProducerId, &errors_cnt)) {
                TEST_FAIL(
                    "Broker error count should succeed for API %s"
                    " on broker %" PRId32,
                    rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), txn_coord);
        }
        /* Checks none of the RD_KAFKAP_InitProducerId responses have been
         * consumed
         */
        TEST_ASSERT(errors_cnt == 1,
                    "Expected error count 1 for API %s, found %zu",
                    rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), errors_cnt);

        /* All done */
        rd_kafka_destroy(rk);

        allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;

        SUB_TEST_PASS();
}


/**
 * @brief Test that the TxnOffsetCommit op doesn't retry without waiting
 * if the coordinator is found but not available, causing too frequent retries.
 */
static void
do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_bool_t times_out) {
        rd_kafka_t *rk;
        rd_kafka_mock_cluster_t *mcluster;
        rd_kafka_resp_err_t err;
        rd_kafka_topic_partition_list_t *offsets;
        rd_kafka_consumer_group_metadata_t *cgmetadata;
        rd_kafka_error_t *error;
        int timeout;

        SUB_TEST_QUICK("times_out=%s", RD_STR_ToF(times_out));

        rk = create_txn_producer(&mcluster, "txnid", 3, NULL);

        test_curr->ignore_dr_err = rd_true;

        TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));

        TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));

        err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
                                RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
        TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));

        /* Wait for messages to be delivered */
        test_flush(rk, 5000);

        /*
         * Fail TxnOffsetCommit with COORDINATOR_NOT_AVAILABLE
         * repeatedly.
         */
        rd_kafka_mock_push_request_errors(
            mcluster, RD_KAFKAP_TxnOffsetCommit, 4,
            RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
            RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
            RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
            RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE);

        offsets = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 1;

        cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");

        /* The retry delay is 500ms, with 4 retries it should take at least
         * 2000ms for this call to succeed. */
        timeout = times_out ? 500 : 4000;
        error   = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata,
                                                       timeout);
        rd_kafka_consumer_group_metadata_destroy(cgmetadata);
        rd_kafka_topic_partition_list_destroy(offsets);

        if (times_out) {
                TEST_ASSERT(rd_kafka_error_code(error) ==
                                RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
                            "expected %s, got: %s",
                            rd_kafka_err2name(
                                RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE),
                            rd_kafka_err2str(rd_kafka_error_code(error)));
        } else {
                TEST_ASSERT(rd_kafka_error_code(error) ==
                                RD_KAFKA_RESP_ERR_NO_ERROR,
                            "expected \"Success\", found: %s",
                            rd_kafka_err2str(rd_kafka_error_code(error)));
        }
        rd_kafka_error_destroy(error);

        /* All done */
        rd_kafka_destroy(rk);

        SUB_TEST_PASS();
}


int main_0105_transactions_mock(int argc, char **argv) {
        TEST_SKIP_MOCK_CLUSTER(0);

        do_test_txn_recoverable_errors();

        do_test_txn_fatal_idempo_errors();

        do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
        do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);

        do_test_txn_req_cnt();

        do_test_txn_requires_abort_errors();

        do_test_txn_slow_reinit(rd_false);
        do_test_txn_slow_reinit(rd_true);

        /* Just do a subset of tests in quick mode */
        if (test_quick)
                return 0;

        do_test_txn_endtxn_errors();

        do_test_txn_endtxn_infinite();

        do_test_txn_endtxn_timeout();

        do_test_txn_endtxn_timeout_inflight();

        /* Bring down the coordinator */
        do_test_txn_broker_down_in_txn(rd_true);

        /* Bring down partition leader */
        do_test_txn_broker_down_in_txn(rd_false);

        do_test_txns_not_supported();

        do_test_txns_send_offsets_concurrent_is_retried();

        do_test_txns_send_offsets_non_eligible();

        do_test_txn_coord_req_destroy();

        do_test_txn_coord_req_multi_find();

        do_test_txn_addparts_req_multi();

        do_test_txns_no_timeout_crash();

        do_test_txn_auth_failure(
            RD_KAFKAP_InitProducerId,
            RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);

        do_test_txn_auth_failure(
            RD_KAFKAP_FindCoordinator,
            RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);

        do_test_txn_flush_timeout();

        do_test_unstable_offset_commit();

        do_test_commit_after_msg_timeout();

        do_test_txn_switch_coordinator();

        do_test_txn_switch_coordinator_refresh();

        do_test_out_of_order_seq();

        do_test_topic_disappears_for_awhile();

        do_test_disconnected_group_coord(rd_false);

        do_test_disconnected_group_coord(rd_true);

        do_test_txn_coordinator_null_not_fatal();

        do_test_txn_resumable_calls_timeout(rd_true);

        do_test_txn_resumable_calls_timeout(rd_false);

        do_test_txn_resumable_calls_timeout_error(rd_true);

        do_test_txn_resumable_calls_timeout_error(rd_false);
        do_test_txn_resumable_init();

        do_test_txn_concurrent_operations(rd_true /*commit*/);

        do_test_txn_concurrent_operations(rd_false /*abort*/);

        do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);

        do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);

        do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_true);

        do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_false);

        return 0;
}
